#include "config.h"

#include <sys/types.h>
#include <regex.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <sys/vfs.h>
#include <unistd.h>
#include <getopt.h>
#include <dirent.h>
#include <sys/statvfs.h>

#define DBG_SUBSYS S_LIBCLUSTER

#include "sysy_lib.h"
#include "metadata.h"
#include "configure.h"
#include "diskmap.h"
#include "node.h"
#include "nodetable.h"
#include "nodeid.h"
#include "net_global.h"
#include "net_table.h"
#include "ylog.h"
#include "dbg.h"
#include "bh.h"
#include "pool_list.h"
#include "../dispatch/dispatch.h"
#include "timer.h"

#define NODETSBLE_DFINFO_NONE "0:"

typedef struct {
        struct dirent *de;
        int fd;
        int offset;
        int left;
} arg_t;

typedef struct {
        // map: hostname -> entry_t
        hashtable_t tab;
        sy_rwlock_t lock;
        int inited;
        int count;
} nodetable_t;

typedef struct {
        nodestat_t stat;
        sy_rwlock_t lock;
        // 节点上已有的卷
        hashtable_t local_vols;
        char status[NODE_STATUS_MAX];
        // 节点上的pool，字符串形式
        char dfinfo[MAX_BUF_LEN];
        char name[0];
} entry_t;

typedef struct {
        chkid_t chkid;
        time_t last_notifiction;
} entry_vol_t;

static nodetable_t nodetable;
static time_t __last_reset__ = 0;
extern admin_t *__admin__;

static int __nodetable_add(const char *name, const nodestat_t *stat, const char *dfinfo);

static int __cmp(const void *v1, const void *v2)
{
        const entry_t *ent = v1;
        const char *name = v2;

        return strcmp(ent->name, name);
}

static uint32_t __key(const void *i)
{
        return hash_str((char *)i);
}

static uint32_t __hash_local_vol_key(const void *i)
{
        return hash_mem(i, sizeof(chkid_t));
}

static int __hash_local_vol_cmp(const void *v1, const void *v2)
{
        const entry_vol_t *ent = v1;
        const chkid_t *chkid = v2;
        return chkid_cmp(&ent->chkid, chkid);
}

static int __nodetable_load()
{
        int ret, i, retry;
        etcd_node_t *list = NULL, *node;
        char tmp[MAX_NAME_LEN], buf[MAX_BUF_LEN];
        nodeinfo_t info;
        nodestat_t stat;
        nid_t nid;


        ret = etcd_list(ETCD_NODE, &list);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        DINFO("node table empty\n");
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        for(i = 0; i < list->num_node; i++) {
                node = list->nodes[i];

                retry = 0;
        retry:
                ret = etcd_get_text(ETCD_NODE, node->key, tmp, NULL);
                if (unlikely(ret)) {
                        DERROR("%s not found\n", node->key);
                        USLEEP_RETRY(err_free, ret, retry, retry, 30, (1000*100));
                }

                str2nid(&nid, tmp);
                DINFO("node %s nid %u\n", node->key, nid.id);

                ret = nodeid_used(nid.id);
                YASSERT(ret == 0);

                memset(&stat, 0x0, sizeof(stat));
                stat.nid = nid;
                stat.status = 0;

                ret = node_getinfo(&info, node->key, buf);
                if (unlikely(ret)) {
                        info.dfinfo = NODETSBLE_DFINFO_NONE;
                }

                ret = __nodetable_add(node->key, &stat, info.dfinfo);
                if (unlikely(ret))
                        GOTO(err_free, ret);
        }

        free_etcd_node(list);

out:
        return 0;
err_free:
        free_etcd_node(list);
err_ret:
        return ret;
}

/**
 * nodetable check connection for nodetable close after __nodetable_load
 *
 * diskmap add in __nodetable_add but node offline can not delete node from nodetab,
 *         because __nodetable_reset_handler not called.
 * so check nodetable is connected after nodetable init 1min later,
 *         if node not connected,then drop node from nodetable.
 */
static worker_handler_t nodetable_handler;
STATIC int __nodetable_offline_nolock(const char *name);

STATIC int __nodetable_check_connection__(void *_arg, void *_ent)
{
        int ret;
        entry_t *ent = _ent;
        nodestat_t *stat = &ent->stat;
        net_handle_t nh;

        (void) _arg;
        id2nh(&nh, &stat->nid);
        DINFO("check node %s\n", ent->name);

        if (!netable_connected(&nh.u.nid)) {
                DINFO("node %s offline\n", ent->name);
                ret = __nodetable_offline_nolock(ent->name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __nodetable_check_connection(void *_args)
{
        int ret;

        (void) _args;

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                DERROR("nodetable not init\n");
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }
        
        ret = hash_iterate_table_entries(nodetable.tab, __nodetable_check_connection__, NULL);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&nodetable.lock);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        DERROR("nodetable check connection error ret (%d) %s\n", ret, strerror(ret));

        ret = timer1_settime(&nodetable_handler, USEC_PER_MIN);
        if (unlikely(ret)) {
                DERROR("settime error ret %d\n", ret);
                YASSERT(0);
        }

        return ret;
}

int nodetable_init()
{
        int ret;

        YASSERT(nodetable.inited == 0);

        nodetable.tab = hash_create_table(__cmp, __key, "nodetable");
        if (nodetable.tab == 0) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_init(&nodetable.lock, "nodetable.lock");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        nodetable.inited = 1;
        nodetable.count = 0;

        ret = diskmap_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = poolmap_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = nodeid_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __nodetable_load();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_create(&nodetable_handler, "nodetable_check_connection", __nodetable_check_connection, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = timer1_settime(&nodetable_handler, USEC_PER_MIN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC void __nodetable_check_writeable(entry_t *ent, const nodestat_t *stat)
{
        int old, new;

        old = node_stat_writeable(ent->stat.status);
        new = node_stat_writeable(stat->status);

        if (!old && new) {
                diskmap_add(ent->name, &stat->nid);
        } else if (old && !new) {
                diskmap_del(ent->name);
        }
}

STATIC int __nodetable_dfinfo_found(nodedfree_t *dfree, const char *pool)
{
        int i;

        for (i = 0; i < dfree->pool_count; i++) {
                if (!strcmp(dfree->pool_stat[i].name, pool))
                        return i;
        }

        return -1;
}

STATIC int __nodetable_dfinfo_writeable(nodedfree_t *dfree, int i)
{
        return dfree->pool_stat[i].total - dfree->pool_stat[i].used > cdsconf.disk_keep;
}

STATIC int __nodetable_check_dfinfo(entry_t *ent, const char *dfinfo, const nodestat_t *stat)
{
        int ret, i, idx;
        nodedfree_t *dfree_old, *dfree_new;

        ret = ymalloc((void **)&dfree_old, sizeof(*dfree_old) * 2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        dfree_new = dfree_old + 1;

        DBUG("nodetable %s old %s new %s\n", ent->name, ent->dfinfo, dfinfo);

        dfinfo_decode(ent->dfinfo, strlen(ent->dfinfo) + 1, dfree_old);
        dfinfo_decode(dfinfo, strlen(dfinfo) + 1, dfree_new);

        for (i = 0; i < dfree_new->pool_count; i ++) {
                idx = __nodetable_dfinfo_found(dfree_old, dfree_new->pool_stat[i].name);
                if (idx == -1) {
                        if (__nodetable_dfinfo_writeable(dfree_new, i)) {
                                DINFO("poolmap insert node %s pool %s old:%s new:%s\n",
                                                ent->name, dfree_new->pool_stat[i].name, ent->dfinfo, dfinfo);
                                poolmap_insert(ent->name, dfree_new->pool_stat[i].name, &stat->nid);
                        } else {
                                DINFO("poolmap node %s pool %s writeable %d old:%s new:%s\n",
                                                ent->name, dfree_new->pool_stat[i].name,
                                                __nodetable_dfinfo_writeable(dfree_new, i), ent->dfinfo, dfinfo);
                        }
                } else {
                        if (__nodetable_dfinfo_writeable(dfree_new, i) && !__nodetable_dfinfo_writeable(dfree_old, idx)) {
                                DINFO("poolmap insert node %s pool %s old:%s new:%s\n",
                                                ent->name, dfree_new->pool_stat[i].name, ent->dfinfo, dfinfo);
                                poolmap_insert(ent->name, dfree_new->pool_stat[i].name, &stat->nid);
                        } else if (!__nodetable_dfinfo_writeable(dfree_new, i) && __nodetable_dfinfo_writeable(dfree_old, idx)) {
                                DINFO("poolmap remove node %s pool %s old:%s new:%s\n",
                                                ent->name, dfree_new->pool_stat[i].name, ent->dfinfo, dfinfo);
                                poolmap_remove(ent->name, dfree_new->pool_stat[i].name);
                        }
                }
        }

        for (i = 0; i < dfree_old->pool_count; i ++) {
                idx = __nodetable_dfinfo_found(dfree_new, dfree_old->pool_stat[i].name);
                if (idx == -1) {
                        DINFO("poolmap remove node %s pool %s old:%s new:%s\n",
                                        ent->name, dfree_old->pool_stat[i].name, ent->dfinfo, dfinfo);
                        poolmap_remove(ent->name, dfree_old->pool_stat[i].name);
                }
        }

        yfree((void **)&dfree_old);

        return 0;
err_ret:
        return ret;
}

STATIC int __nodetable_remove_dfinfo(entry_t *ent)
{
        int ret, i;
        nodedfree_t *dfree;

        ret = ymalloc((void **)&dfree, sizeof(*dfree));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        dfinfo_decode(ent->dfinfo, strlen(ent->dfinfo) + 1, dfree);

        for (i = 0; i < dfree->pool_count; i ++) {
                if (__nodetable_dfinfo_writeable(dfree, i))
                        poolmap_remove(ent->name, dfree->pool_stat[i].name);
        }

        strcpy(ent->dfinfo, NODETSBLE_DFINFO_NONE);

        yfree((void **)&dfree);

        return 0;
err_ret:
        return ret;
}

STATIC int __nodetable_add(const char *name, const nodestat_t *stat, const char *dfinfo)
{
        int ret;
        char *pname = NULL;
#if LOCK_DEBUG
        char lname[MAX_LOCK_NAME];
#endif
        entry_t *ent;

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_wrlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }

        DINFO("add node %s dfinfo %s\n", name, dfinfo);

        ent = hash_table_find(nodetable.tab, (void *)name);
        if (ent == NULL) {
                ret = ymalloc((void **)&ent, sizeof(*ent) + strlen(name) + 1);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

#if LOCK_DEBUG
                pname = lname;
                sprintf(pname, "nodetable.ent.%d", stat->nid.id);
#endif

                ret = sy_rwlock_init(&ent->lock, pname);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                strcpy(ent->name, name);
                strcpy(ent->dfinfo, NODETSBLE_DFINFO_NONE);
                ent->stat = *stat;
                ent->local_vols = hash_create_table(__hash_local_vol_cmp, __hash_local_vol_key, "local_vols");

#if 1
                if (node_stat_writeable(ent->stat.status)) {
                        diskmap_add(ent->name, &ent->stat.nid);
                } else {
                        DWARN("node %s dfinfo %s not writeable\n", name, dfinfo);
                }
                __nodetable_check_dfinfo(ent, dfinfo, stat);
                strcpy(ent->dfinfo, dfinfo);
#endif

                ret = hash_table_insert(nodetable.tab, (void *)ent,
                                        (void *)&ent->name, 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                nodetable.count++;
        } else {
                ret = EEXIST;
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&nodetable.lock);

        return 0;
err_free:
        yfree((void **)&ent);
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

STATIC int __nodetable_update(const char *name, const char *dfinfo, const nodestat_t *stat)
{
        int ret;
        entry_t *ent;

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_wrlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }

        ent = hash_table_find(nodetable.tab, (void *)name);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_lock, ret);
        }

        __nodetable_check_writeable(ent, stat);
        __nodetable_check_dfinfo(ent, dfinfo, stat);

        ent->stat = *stat;
        strcpy(ent->dfinfo, dfinfo);

        sy_rwlock_unlock(&nodetable.lock);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

typedef struct {
        hashtable_t tab;
        time_t time;
} clean_arg_t;

STATIC int __nodetable_cleanup_local_vols(void *_arg, void *_ent)
{
        int ret;
        clean_arg_t *arg = _arg;
        entry_vol_t *ent_vol = _ent;

        if (arg->time != ent_vol->last_notifiction) {
                ret = hash_table_remove(arg->tab, &ent_vol->chkid, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int nodetable_update_local_vols(const char *name, const local_vol_t *local_vols, int options)
{
        int ret;
        uint64_t i;
        time_t now;
        entry_t *ent;
        clean_arg_t arg;
        entry_vol_t *ent_vol;
        const chkid_t *chkid;

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_lock1, ret);
        }

        ent = hash_table_find(nodetable.tab, (void *)name);
        if (ent == NULL) {
                ret = ENOENT;
                GOTO(err_lock1, ret);
        }

        ret = sy_rwlock_wrlock(&ent->lock);
        if (unlikely(ret))
                GOTO(err_lock1, ret);

        now = gettime();

        for (i = 0; i < local_vols->volcount; i++) {
                chkid = &local_vols->vols[i];
                ent_vol = hash_table_find(ent->local_vols, chkid);
                if (options == LOCAL_VOL_SYNC || options == LOCAL_VOL_ADD) {
                        if (ent_vol) {
                                ent_vol->last_notifiction = now;
                        } else {
                                ret = ymalloc((void **)&ent_vol, sizeof(*ent_vol));
                                if (unlikely(ret))
                                        GOTO(err_lock2, ret);

                                ent_vol->chkid = *chkid;
                                ent_vol->last_notifiction = now;
                                hash_table_insert(ent->local_vols, ent_vol, &ent_vol->chkid, 0);
                        }
                } else if (options == LOCAL_VOL_DEL) {
                        if (ent_vol) {
                                ret = hash_table_remove(ent->local_vols, chkid, NULL);
                                if (unlikely(ret))
                                        YASSERT(0);
                        } else {
                                DWARN("notifiction: rm vol vol not exist!\n");
                                continue;
                        }
                }
        }

        if (options == LOCAL_VOL_SYNC) {
                arg.tab = ent->local_vols;
                arg.time = now;
                hash_iterate_table_entries(ent->local_vols, __nodetable_cleanup_local_vols, &arg);
        }

        sy_rwlock_unlock(&ent->lock);
        sy_rwlock_unlock(&nodetable.lock);

        return 0;

err_lock2:
        sy_rwlock_unlock(&ent->lock);
err_lock1:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

STATIC void __local_vols_destroy(void *_ent)
{
        entry_vol_t *ent = _ent;
        yfree((void **)&ent);
}

int nodetable_del(const char *name)
{
        int ret;
        entry_t *ent;

        DINFO("remove %s\n", name);

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_wrlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }

        ret = etcd_del(ETCD_NODE, name);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        ret = hash_table_remove(nodetable.tab, name, (void **)&ent);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        hash_destroy_table(ent->local_vols, __local_vols_destroy);

        ret = nodeid_drop(ent->stat.nid.id);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        if (node_stat_writeable(ent->stat.status))
                diskmap_del(ent->name);
        __nodetable_remove_dfinfo(ent);

        yfree((void **)&ent);
        nodetable.count--;

        sy_rwlock_unlock(&nodetable.lock);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

int nodetable_count(int *count)
{
        int ret;

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        YASSERT(nodetable.inited);

        *count = nodetable.count;

        return 0;
err_ret:
        return ret;
}

/*
static int __nodetable_list(void *_arg, void *_ent)
{
        int ret;
        entry_t *ent = _ent;
        arg_t *arg = _arg;
        struct dirent *de;

        if ((int)(sizeof(*de) + strlen(ent->name) + 1) > arg->left) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        de = arg->de;
        memset(de, 0x0, sizeof(*de));
        de->d_reclen = sizeof(*de) + strlen(ent->name) + 1;
        //de->d_off = (loc + 1) * CHUNKTABLE_DT_REC_SIZE;
        strcpy(de->d_name, ent->name);
        arg->left -= de->d_reclen;
        arg->de = (void *)arg->de + de->d_reclen;

        return 0;
err_ret:
        return ret;
}
*/

STATIC int __nodetable_list_to_file(void *_arg, void *_ent)
{
        int ret;
        entry_t *ent = _ent;
        arg_t *arg = _arg;
        struct dirent de;

        memset(&de, 0x0, sizeof(de));
        de.d_reclen = sizeof(de) + MAX_NAME_LEN;

        strcpy(de.d_name, ent->name);

        DBUG("fd[%u]: %s\n", arg->fd, ent->name);
        ret = _pwrite(arg->fd, &de, de.d_reclen, arg->offset);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        arg->offset += de.d_reclen;

        return 0;
err_ret:
        return ret;
}

STATIC int __nodelist_write_end_flag(int fd, uint64_t offset)
{
        struct dirent de;
        int ret;

        memset(&de, 0x0, sizeof(de));
        de.d_name[0] = '\0';
        de.d_reclen = sizeof(de) + MAX_NAME_LEN;
        de.d_off = offset;

        ret = _pwrite(fd, &de, de.d_reclen, offset);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int nodetable_list_open(const char *uuid)
{
        int ret, fd;
        arg_t arg;

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = pool_list_newfd(&fd);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        arg.fd = fd;
        arg.offset = 0;

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        ret = hash_iterate_table_entries(nodetable.tab, __nodetable_list_to_file,  &arg);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&nodetable.lock);

        ret = __nodelist_write_end_flag(fd, arg.offset);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        ret = pool_list_addfd(uuid, fd);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_fd:
        close(fd);
err_ret:
        return ret;
}

int nodetable_list(char *buf, int *len, const char *uuid, uint64_t offset)
{
        return pool_list_lspool(uuid, offset, buf, len);
}

int nodetable_list_close(const char *uuid)
{
        return pool_list_closefd(uuid);
}

typedef struct {
        nid_t *nid;
        const char *site_name;
        unsigned int volcount;
        int found;
} cmp_arg_t;

static int __local_vols_cmp(void *_arg, void *_ent)
{
        entry_t *ent = _ent;
        cmp_arg_t *arg = _arg;
        char _site[HOST_NAME_MAX];

        disk2site(ent->name, _site);

        // if site is limited, skip other site
        if (!cluster_storage_area_is_null(arg->site_name) && 0 != strcmp(_site, arg->site_name)) {
                goto out;
        }

        if (node_stat_writeable(ent->stat.status)) {
                if ( arg->found == 0 || ent->local_vols->num_of_entries <= arg->volcount) {
                        *(arg->nid) = ent->stat.nid;
                        arg->volcount = ent->local_vols->num_of_entries;
                        arg->found = 1;
                }
        }
out:
        return 0;
}

STATIC int __nodetable_get_local_vol_min_insite(const char *site_name, nid_t *nid)
{
        int ret;
        cmp_arg_t arg;

        arg.nid = nid;
        arg.site_name = site_name;
        arg.volcount = 0;
        arg.found = 0;

        ret = hash_iterate_table_entries(nodetable.tab, __local_vols_cmp, &arg);
        if (unlikely(ret))
                return ret;

        if (unlikely(arg.found == 0))
                ret = ENOSPC;

        DINFO("vol notifiction get local vol min node %d\n", nid->id);

        return ret;
}

int nodetable_newdisk(const char *pool, diskid_t *diskid, int repnum,
                         const nid_t *skip, int skip_count, int flag)
{
        int ret, i = 0;

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }

        ret = diskmap_get(pool, diskid, repnum, skip, skip_count, flag);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&nodetable.lock);

        for (i = 0; i < repnum; i++) {
                YASSERT(!net_isnull(&diskid[i]));
        }

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

STATIC void __nodetable_destroy(void *_ent)
{
        entry_t *ent = _ent;

        hash_destroy_table(ent->local_vols, __local_vols_destroy);

        if (node_stat_writeable(ent->stat.status))
                diskmap_del(ent->name);
        __nodetable_remove_dfinfo(ent);

        network_close(&ent->stat.nid, "nodetable", NULL);

        yfree((void **)&ent);
}

void nodetable_destroy()
{
        int ret;
        YASSERT(nodetable.inited);

        nodetable.inited = 0;
        
        ret = sy_rwlock_wrlock(&nodetable.lock);
        if (unlikely(ret))
                YASSERT(0);

        hash_destroy_table(nodetable.tab, __nodetable_destroy);

        nodetable.tab = NULL;
        sy_rwlock_unlock(&nodetable.lock);

        diskmap_destroy();
        poolmap_destroy();
}

STATIC int __nodetable_get(const char *name, nodestat_t *stat)
{
        int ret;
        entry_t *ent;

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }

        DBUG("get node %s\n", name);

        ent = hash_table_find(nodetable.tab, (void *)name);
        if (ent == NULL) {
                DINFO("get node %s fail\n", name);
                ret = ENOENT;
                GOTO(err_lock, ret);
        }

        *stat = ent->stat;

        sy_rwlock_unlock(&nodetable.lock);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

static void  __nodetable_reset_handler(const nid_t *peer, const char *nodename)
{
        (void) peer;

        DINFO("nodetable reset handler %s\n", nodename);
        nodetable_offline(nodename);
}

int nodetable_new(const char *name, nid_t *nid)
{
        int ret;
        nodestat_t stat;
        char tmp[MAX_BUF_LEN];

        ret = sy_rwlock_wrlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = nodeid_newid(&nid->id, name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        nid2str(tmp, nid);
        ret = etcd_create_text(ETCD_NODE, name, tmp, 0);
        if (unlikely(ret)) {
                GOTO(err_lock, ret);
        }

        sy_rwlock_unlock(&nodetable.lock);

        memset(&stat, 0x0, sizeof(stat));
        stat.nid = *nid;

        ret = __nodetable_add(name, &stat, NODETSBLE_DFINFO_NONE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("add node %s nid %d\n", name, nid->id);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}


int nodetable_get(const char *name, nodestat_t *stat)
{
        DINFO("get %s\n", name);

        return __nodetable_get(name, stat);
}

int nodetable_update(const char *name, const char *dfinfo, const nodestat_t *_stat)
{
        int ret;
        nodestat_t stat;
        net_handle_t nh;

        DBUG("disk %s join, writeable %u\n", name, node_stat_writeable(_stat->status));

        ret = __nodetable_get(name, &stat);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DWARN("node %s not exist, eperm\n", name);
                        ret = EPERM;
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        } else {
                ret = __nodetable_update(name, dfinfo, _stat);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (stat.status == 0 && !net_islocal(&_stat->nid)) {
                        id2nh(&nh, &_stat->nid);
                        ret = network_connect(&nh.u.nid, NULL, 1, 0);
                        if (unlikely(ret))
                                GOTO(err_offline, ret);

                        ret = netable_add_reset_handler(&_stat->nid, __nodetable_reset_handler);
                        if (unlikely(ret)) {
                                GOTO(err_offline, ret);
                        }
                }
        }

        return 0;
err_offline:
        nodetable_offline(name);
err_ret:
        return ret;
}

STATIC int __nodetable_offline_nolock(const char *name)
{
        int ret;
        entry_t *ent;

        ent = hash_table_find(nodetable.tab, (void *)name);
        if (ent == NULL) {
                DERROR("nodetable not found %s\n", name);
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (node_stat_writeable(ent->stat.status))
                diskmap_del(ent->name);
        __nodetable_remove_dfinfo(ent);

        ent->stat.status = 0;

        return 0;
err_ret:
        return ret;
}

int nodetable_offline(const char *name)
{
        int ret;

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                DERROR("nodetable not init\n");
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                DERROR("nodetable not init\n");
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }
        
        ret = __nodetable_offline_nolock(name);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&nodetable.lock);

        __last_reset__ = gettime();

        DINFO("nodetable %s offline\n", name);
        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

int nodetable_writeable(const nid_t *diskid, int count)
{
        int ret, i;
        nodestat_t stat;
        char host[MAX_NAME_LEN];

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        ret = sy_rwlock_rdlock(&nodetable.lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (nodetable.inited == 0 || nodetable.tab == NULL) {
                DERROR("nodetable not init\n");
                ret = ENOSYS;
                GOTO(err_lock, ret);
        }

        for (i = 0; i < count; i++) {
                ret = network_rname1(&diskid[i], host);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __nodetable_get(host, &stat);
                if (unlikely(ret)) {
                        ret = ENOSPC;
                        GOTO(err_lock, ret);
                }

                if (!node_stat_writeable(stat.status)) {
                        ret = ENOSPC;
                        GOTO(err_lock, ret);
                }
        }

        sy_rwlock_unlock(&nodetable.lock);

        return 0;
err_lock:
        sy_rwlock_unlock(&nodetable.lock);
err_ret:
        return ret;
}

#if 1
int __nodetable_hostcmp(const char *host1, const char *host2)
{
        const char *c1, *c2;
        int len1, len2;


        c1 = strchr(host1, '/');
        c2 = strchr(host2, '/');

        if (c1 == NULL)
                len1 = strlen(host1);
        else
                len1 = c1 - host1;

        if (c2 == NULL)
                len2 = strlen(host2);
        else
                len2 = c2 - host2;

        DBUG("cmp %s, %s, len1 %u, len2 %u\n", host1, host2, len1, len2);

        if (len1 == len2) {
                DBUG("%s, %s len %u\n", host1, host2, len1);
                return memcmp(host1, host2, len1);
        } else {
                return len1 - len2;
        }
}

STATIC int __dump_local_vols(void *_arg, void *_ent)
{
        entry_vol_t *ent_vol = _ent;

        (void) _arg;
        DBUG("nodetable_dump "CHKID_FORMAT" last:%u\n", CHKID_ARG(&ent_vol->chkid),
                        (int)ent_vol->last_notifiction);
        return 0;
}

STATIC int __dump_nodes(void *_arg, void *_ent)
{
        entry_t *ent = _ent;

        (void) _arg;
        DINFO("nodetable_dump ent: %s volcount:%d\n", ent->name,
                        ent->local_vols->num_of_entries);
        hash_iterate_table_entries(ent->local_vols, __dump_local_vols, NULL);
        return 0;
}

void nodetable_dump()
{
        DINFO("========nodetable dump========\n");
        hash_iterate_table_entries(nodetable.tab, __dump_nodes, NULL);
        DINFO("========nodetable dump========\n");
}

/**
 * 是否只有一个节点
 *
 * @return
 */
int nodetable_standalone()
{
        int ret, delen, i, done = 0;
        char de0[MAX_BUF_LEN], *host = NULL;
        struct dirent *de;
        uint64_t offset = 0, offset2 = 0;
        static int standalone = 0, last_check = 0;
        uuid_t _uuid;
        char uuid[MAX_NAME_LEN] = {};

        if (nodetable.inited == 0) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        if (gettime() - last_check < 60) {
                return standalone;
        }

        uuid_generate(_uuid);
        uuid_unparse(_uuid, uuid);

        ret = nodetable_list_open(uuid);
        if (ret)
                GOTO(err_ret, ret);

        while (done == 0) {
                delen = MAX_BUF_LEN;
                memset(de0, 0, sizeof(de0));
                ret = nodetable_list(de0, &delen, uuid, offset);
                if (unlikely(ret))
                        GOTO(err_close, ret);

                if (delen == 0)
                        break;

                i = 0;
                offset2 = 0;
                dir_for_each(de0, delen, de, offset2) {
                        if (strlen(de->d_name) == 0) {
                                done = 1;
                                break;
                        } else if (delen - offset2 < sizeof(*de) + MAX_NAME_LEN)
                                break;

                        DBUG("name %s\n", de->d_name);
                        offset += de->d_reclen;

                        if (i == 0) {
                                i++;
                                host = de->d_name;
                                continue;
                        } else
                                i++;

                        if (__nodetable_hostcmp(host, de->d_name)) {
                                standalone = 0;
                                goto out;
                        }
                }
        }

        standalone = 1;
out:
        last_check = gettime();
        nodetable_list_close(uuid);
        return standalone;
err_close:
        nodetable_list_close(uuid);
err_ret:
        return 0;
}
#endif

time_t nodetable_last_reset()
{
        return __admin__->uptime >  __last_reset__ ?
                __admin__->uptime : __last_reset__;
}
